// source: https://github.com/nanomsg/nng/blob/master/demo/async/server.c
// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitoar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
// file was obtained (LICENSE.txt).  A copy of the license may also be
// found online at https://opensource.org/licenses/MIT.
//

// This program serves as an example for how to write an async RPC service,
// using the request/reply pattern and contexts (nng_ctx(5)).  The server
// allocates a number of contexts up front, which determines the amount of
// parallelism possible.  The callbacks are handled asynchronously, so
// this could be done by threads, or something similar.  For our uses we
// make use of an event driven architecture that we already have available.

// Our demonstration application layer protocol is simple.  The client sends
// a number of milliseconds to wait before responding.  The server just gives
// back an empty reply after waiting that long.

// To run this program, start the server as async_demo <url> -s
// Then connect to it with the client as async_client <url> <msec>.
//
//  For example:
//
//  % ./server tcp://127.0.0.1:5555 &
//  % ./client tcp://127.0.0.1:5555 323
//  Request took 324 milliseconds.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <nng/supplemental/util/platform.h>

// Parallel is the maximum number of outstanding requests we can handle.
// This is *NOT* the number of threads in use, but instead represents
// outstanding work items.  Select a small number to reduce memory size.
// (Each one of these can be thought of as a request-reply loop.)  Note
// that you will probably run into limitations on the number of open file
// descriptors if you set this too high. (If not for that limit, this could
// be set in the thousands, each context consumes a couple of KB.)
#ifndef PARALLEL
#    define PARALLEL 128
#endif

// The server keeps a list of work items, sorted by expiration time,
// so that we can use this to set the timeout to the correct value for
// use in poll.
struct work {
    enum { INIT, RECV, WAIT, SEND } state;
    nng_aio *aio;
    nng_msg *msg;
    nng_ctx ctx;
};

void fatal(const char *func, int rv) {
    fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
    exit(1);
}

void server_cb(void *arg) {
    struct work *work = (struct work *) arg;
    nng_msg *msg;
    int rv;
    uint32_t when;

    switch (work->state) {
        case work::INIT:
            work->state = work::RECV;
            nng_ctx_recv(work->ctx, work->aio);
            break;
        case work::RECV:
            if ((rv = nng_aio_result(work->aio)) != 0) {
                fatal("nng_ctx_recv", rv);
            }
            msg = nng_aio_get_msg(work->aio);
            if ((rv = nng_msg_trim_u32(msg, &when)) != 0) {
                // bad message, just ignore it.
                nng_msg_free(msg);
                nng_ctx_recv(work->ctx, work->aio);
                return;
            }
            work->msg = msg;
            work->state = work::WAIT;
            nng_sleep_aio(when, work->aio);
            break;
        case work::WAIT:
            // We could add more data to the message here.
            nng_aio_set_msg(work->aio, work->msg);
            work->msg = nullptr;
            work->state = work::SEND;
            nng_ctx_send(work->ctx, work->aio);
            break;
        case work::SEND:
            if ((rv = nng_aio_result(work->aio)) != 0) {
                nng_msg_free(work->msg);
                fatal("nng_ctx_send", rv);
            }
            work->state = work::RECV;
            nng_ctx_recv(work->ctx, work->aio);
            break;
        default:
            fatal("bad state!", NNG_ESTATE);
            break;
    }
}

struct work *alloc_work(nng_socket sock) {
    struct work *w;
    int rv;

    if ((w = (struct work *) nng_alloc(sizeof(*w))) == nullptr) {
        fatal("nng_alloc", NNG_ENOMEM);
    }
    if ((rv = nng_aio_alloc(&w->aio, server_cb, w)) != 0) {
        fatal("nng_aio_alloc", rv);
    }
    if ((rv = nng_ctx_open(&w->ctx, sock)) != 0) {
        fatal("nng_ctx_open", rv);
    }
    w->state = work::INIT;
    return (w);
}

// The server runs forever.
int server(const char *url) {
    nng_socket sock;
    struct work *works[PARALLEL];
    int rv;
    int i;

    /*  Create the socket. */
    rv = nng_rep0_open(&sock);
    if (rv != 0) {
        fatal("nng_rep0_open", rv);
    }

    for (i = 0; i < PARALLEL; i++) {
        works[i] = alloc_work(sock);
    }

    if ((rv = nng_listen(sock, url, nullptr, 0)) != 0) {
        fatal("nng_listen", rv);
    }

    for (i = 0; i < PARALLEL; i++) {
        server_cb(works[i]); // this starts them going (INIT state)
    }

    for (;;) {
        nng_msleep(3600000); // neither pause() nor sleep() portable
    }
}

int main(int argc, char **argv) {
    int rc;

    if (argc != 2) {
        fprintf(stderr, "Usage: %s <url>\n", argv[0]);
        exit(EXIT_FAILURE);
    }
    rc = server(argv[1]);
    exit(rc == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
}
